// Copyright 2014-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// bmqa_message.h                                                     -*-C++-*-
#ifndef INCLUDED_BMQA_MESSAGE
#define INCLUDED_BMQA_MESSAGE

//@PURPOSE: Provide the application with a message data object.
//
//@CLASSES:
//  bmqa::Message:                   message received from/sent to a queue
//  bmqa::MessageConfirmationCookie: cookie for async message confirmation
//
//@SEE_ALSO:
//  bmqa::MessageEvent       : Mechanism for data event notification.
//  bmqa::MessageEventBuilder: Builder for 'bmqa::MessageEvent'
//
//@DESCRIPTION: A 'bmqa::Message' represents the data message to put on a
// queue, or retrieved from a queue.  It is composed of the following fields:
//
//: o A message GUID, which is a printable string generated by the broker to
//:    uniquely identify this message.
//:
//: o A correlation Id, which is a user-provided identifier for the message.
//:
//: o A queue Id, to map with the queue this message is associated with.
//:
//: o The payload, which is opaque to the framework. At some point, framework
//:   may provide utilities to encode and decode schema messages using various
//:   CODECs.  For example, a JavaScript publisher may publish a message into
//:   a queue using a JSON object as payload, and the consumer may be receiving
//:   that payload as a BER-encoded schema object.
//:
//: o At some point, system properties such as version, encoding, timestamp,
//:   priority, message group and "reply-to" will be supported. System
//:   properties will be used by the broker; for example to deliver
//:   high-priority messages first or to filter based on a minimum version.
//:
//: o At some point, application-defined message properties will also be
//:   supported, where properties are a list of name-value pairs.
//
// A 'bmqa::MessageConfirmationCookie' is a small object which allows to
// confirm a 'bmqa::Message' asynchronously without having to hold on to the
// entire message.  This can be useful when, for example, the message is
// decoded in the event handler, and the resulting object is enqueued for
// asynchronous processing, along with that small cookie object for confirming
// the message once successfully processed.

// BMQ

#include <bmqa_queueid.h>
#include <bmqt_compressionalgorithmtype.h>
#include <bmqt_correlationid.h>
#include <bmqt_messageguid.h>
#include <bmqt_subscription.h>

// BDE
#include <bdlbb_blob.h>
#include <bsl_cstddef.h>  // for 'size_t'
#include <bsl_iosfwd.h>
#include <bsl_memory.h>
#include <bsl_string.h>
#include <bslma_allocator.h>
#include <bsls_annotation.h>

namespace BloombergLP {

// FORWARD DECLARATION
namespace bmqimp {
class Event;
}

namespace bmqa {

// FORWARD DECLARATION
class MessageProperties;

// ==================
// struct MessageImpl
// ==================

/// Struct containing the internal (private) members of Message (That is so
/// that we can access private members of Message to initialize it, without
/// having to expose them publicly).
///
/// IMPLEMENTATION NOTE: If adding new data members to this struct that are
///                      lazily populated (such as `queueId` or
///                      `correlationId`), then they should be reset in
///                      `bmqa::MessageIterator.nextMessage()`.
struct MessageImpl {
    // PUBLIC DATA
    bmqimp::Event* d_event_p;
    // Pointer to the Event this message is
    // associated with

    bsl::shared_ptr<bmqimp::Event> d_clonedEvent_sp;
    // May point to a bmqimp::Event (in case
    // this Message is a clone)

    bmqa::QueueId d_queueId;
    // QueueId this message is associated
    // with

    bmqt::CorrelationId d_correlationId;
    // CorrelationId this message is
    // associated with

    bmqt::SubscriptionHandle d_subscriptionHandle;
    // SubscriptionHandle this message is
    // associated with

#ifdef BMQ_ENABLE_MSG_GROUPID
    bsl::string d_groupId;
    // Optional Group Id this message is
    // associated with
#endif
};

// ===============================
// class MessageConfirmationCookie
// ===============================

/// Cookie for async message confirmation.
class MessageConfirmationCookie {
  private:
    // DATA
    bmqa::QueueId d_queueId;
    // QueueID associated to this cookie

    bmqt::MessageGUID d_guid;
    // GUID associated to this cookie

  public:
    // CREATORS

    /// Create an unset instance of this class.
    MessageConfirmationCookie();

    /// Create an instance with the specified `queueId` and `messageGUID`.
    /// Users should not use that constructor directly, but rather load the
    /// message cookie from an existing `bmqa::Message` with the
    /// `bmqa::Message::confirmationCookie` accessor.
    MessageConfirmationCookie(const QueueId&           queueId,
                              const bmqt::MessageGUID& messageGUID);

    // ACCESSORS

    /// Return the queue ID of the message with which this confirmation
    /// cookie is associated.
    const QueueId& queueId() const;

    /// Return message GUID of the message with which this confirmation
    /// cookie is associated.
    const bmqt::MessageGUID& messageGUID() const;
};

// =============
// class Message
// =============

/// A message sent/received to/from the BlazingMQ broker.
class Message {
#ifdef BMQ_ENABLE_MSG_GROUPID
  public:
    // CONSTANTS

    /// Constant representing the maximum length of a Group Id string.
    static const int k_GROUP_ID_MAX_LENGTH = 31;
#endif

  private:
    // DATA
    mutable MessageImpl d_impl;  // pimpl

  private:
    // PRIVATE ACCESSORS

    /// Return true if the message has been initialized with an underlying
    /// event (and thus is valid), and false otherwise.  Any operation
    /// except assignment or destruction on an uninitialized message is an
    /// error.
    bool isInitialized() const;

  public:
    // CREATORS

    /// Create an invalid message having no content. Only valid operations
    /// on an invalid message instance are assignment and destruction.
    Message();

    // MANIPULATORS

    /// Set the payload of this message to the blob pointed to by the
    /// specified `data`.  Behavior is undefined unless `data` is non-null
    /// and payload's length is greater than zero.  Note that payload
    /// pointed to by `data` is *not* copied right away, and should not be
    /// destroyed or modified until this message has been packed (see
    /// `bmqa::MessageEventBuilder` component level documentation for
    /// correct usage).
    ///
    /// This method is deprecated, please use `setDataRef()` instead.
    Message& setData(const bdlbb::Blob* data) BSLS_ANNOTATION_DEPRECATED;

    Message& setData(const char* data,
                     size_t      length) BSLS_ANNOTATION_DEPRECATED;
    // Set the payload of this message to the specified 'length' bytes
    // starting at the specified 'data' address.  The behavior is undefined
    // unless 'data' is non-null and 'length' is greater than zero.  Note
    // that payload pointed to by 'data' is *not* copied right away, and
    // should not be destroyed or modified until this message has been
    // packed (see 'bmqa::MessageEventBuilder' component level
    // documentation for correct usage).
    //
    // This method is deprecated, please use 'setDataRef()' instead.

    /// Set the payload of this message to the blob pointed to by the
    /// specified `data`.  Behavior is undefined unless `data` is non-null
    /// and payload's length is greater than zero.  Note that payload
    /// pointed to by `data` is *not* copied right away, and should not be
    /// destroyed or modified until this message has been packed (see
    /// `bmqa::MessageEventBuilder` component level documentation for
    /// correct usage).
    Message& setDataRef(const bdlbb::Blob* data);

    /// Set the payload of this message to the specified `length` bytes
    /// starting at the specified `data` address.  The behavior is undefined
    /// unless `data` is non-null and `length` is greater than zero.  Note
    /// that payload pointed to by `data` is *not* copied right away, and
    /// should not be destroyed or modified until this message has been
    /// packed (see `bmqa::MessageEventBuilder` component level
    /// documentation for correct usage).
    Message& setDataRef(const char* data, size_t length);

    /// Set the properties of this message to the `MessageProperties`
    /// instance pointed by the specified `properties`.  Behavior is
    /// undefined unless `properties` is non-null.  Note that properties are
    /// *not* copied right away, and should not be destroyed or modified
    /// until this message has been packed (see `bmqa::MessageEventBuilder`
    /// component level documentation for correct usage).
    Message& setPropertiesRef(const MessageProperties* properties);

    /// Clear out and properties associated with this message.  Note that if
    /// there are no properties associated with this message, this method
    /// has no effect.  Also note that the associated `MessageProperties`
    /// instance, if any, is not modified; it's simply dissociated from this
    /// message.
    Message& clearPropertiesRef();

    /// Set correlation ID of this message to the specified `correlationId`.
    Message& setCorrelationId(const bmqt::CorrelationId& correlationId);

    /// Set the Compression algorithm type of the current message to the
    /// specified `value` and return a reference offering modifiable access
    /// to this object.
    Message&
    setCompressionAlgorithmType(bmqt::CompressionAlgorithmType::Enum value);

#ifdef BMQ_ENABLE_MSG_GROUPID
    /// Set Group Id of this message to the specified `groupId`.  The
    /// `groupId` must be a null-terminated string with up to
    /// `Message::k_GROUP_ID_MAX_LENGTH` characters (excluding the
    /// terminating '\0').  The behavior is undefined if the message is not
    /// a started, `PUT` message or if `groupId` is empty.
    Message& setGroupId(const bsl::string& groupId);

    /// Clear the Group Id of this message.  The behavior is undefined if
    /// the message is not a started, `PUT` message.
    Message& clearGroupId();
#endif

    // ACCESSORS

    /// Return true if the message is valid, and false otherwise.  Any
    /// operation except assignment or destruction on an invalid message is
    /// an error.
    ///
    /// This method is deprecated.
    bool isValid() const;  // TBD:BSLS_ANNOTATION_DEPRECATED

    /// Same as `isValid`.
    ///
    /// This method is deprecated.
    operator bool() const;  // TBD:BSLS_ANNOTATION_DEPRECATED

    /// Return a copy of this message, using the optionally specified
    /// `basicAllocator` with the copy holding all the data of this instance
    /// and not backed by any `MessageEvent`.  Note that this operation
    /// does *not* copy underlying data.
    Message clone(bslma::Allocator* basicAllocator = 0) const;

    /// Return the queue ID indicating the queue for which this message has
    /// been received from.  The behavior is undefined unless this instance
    /// represents a `PUT`, `PUSH` or `ACK` message.
    const bmqa::QueueId& queueId() const;

    /// Return the correlation Id associated with this message.  The
    /// behavior is undefined unless this instance represents a `PUT` or an
    /// `ACK`, or a `PUSH` message.  Note that in case of failure to accept
    /// a `PUT` message, BlazingMQ sends an `ACK` message with failed
    /// status, even if an `ACK` message was not requested by the
    /// application (i.e., no correlationId was specified when posting the
    /// message).  In such cases, correlationId associated with the `ACK`
    /// message will be unset, and as such, application *must* check for
    /// that by invoking `isUnset` on the returned correlationId object.  In
    /// the case of a `PUSH` message, the return value is the one specified
    /// as the correlation id of the corresponding Subscription.  Invoking
    /// `thePointer`, `theNumeric` or `theSharedPtr` on an unset
    /// correlationId instance will lead to undefined behavior.
    const bmqt::CorrelationId& correlationId() const;

    /// Return the subscription handle associated with this message.  The
    /// behavior is undefined unless this instance represents a 'PUSH'
    /// message.  The return value is the one specified as the subscription
    /// handle of the corresponding Subscription.
    const bmqt::SubscriptionHandle& subscriptionHandle() const;

    /// Return Compression algorithm type of the current message. Behavior
    /// is undefined unless this instance represents a `PUT` or `PUSH`
    /// message.
    bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType() const;

#ifdef BMQ_ENABLE_MSG_GROUPID
    /// Return the Group Id associated with this message.  The behavior is
    /// undefined unless this instance represents a `PUT` or a `PUSH`
    /// message and `hasGroupId()` returns 'true".
    const bsl::string& groupId() const;
#endif

    /// Return the unique message Id generated by the SDK for this message.
    /// The behavior is undefined unless this instance represents a `PUT`,
    /// a `PUSH` or an `ACK` message.
    const bmqt::MessageGUID& messageGUID() const;

    /// Return a cookie which can be used to confirm this message.  The
    /// behavior is undefined unless this instance represents a `PUSH`
    /// message.
    MessageConfirmationCookie confirmationCookie() const;

    /// Return the status of the `ACK` message.  The behavior is undefined
    /// unless this instance represents an `ACK` message.  This value
    /// correspond to the `bmqt::AckResult::Enum` enum.
    int ackStatus() const;

    /// Load into the specified `blob` the payload of the message, if any.
    /// Return zero if the message has a payload and non-zero value
    /// otherwise.  The behaviour is undefined unless this instance
    /// represents a `PUT` or `PUSH` message.  Note that for efficiency,
    /// application should fetch payload once and cache the value, instead
    /// of invoking this method multiple times on a message.
    int getData(bdlbb::Blob* blob) const;

    /// Return the number of bytes in the payload.  The behaviour is
    /// undefined unless this instance represents a `PUT` or a `PUSH`
    /// message.  Note that for efficiency, application should fetch payload
    /// size once and cache the value, instead of invoking this method
    /// multiple times on a message.
    int dataSize() const;

    /// Return `true` if this instance has at least one message property
    /// associated with it, `false` otherwise.  Behavior is undefined unless
    /// this instance represents a `PUT` or a `PUSH` message.
    bool hasProperties() const;

#ifdef BMQ_ENABLE_MSG_GROUPID
    /// Return whether this message has an associated GroupId set.  The
    /// behavior is undefined unless this instance represents a `PUT` or a
    /// `PUSH` message.
    bool hasGroupId() const;
#endif

    /// Load into the specified `buffer` the properties associated with this
    /// message.  Return zero on success, and a non-zero value otherwise.
    /// Behavior is undefined unless this instance represents a `PUT` or a
    /// `PUSH` message, and unless `buffer` is non-null.  Note that if there
    /// are no properties associated with this message, zero will be
    /// returned and the `MessageProperties` instance pointed by `buffer`
    /// will be cleared.  Also note that for efficiency, application should
    /// fetch properties once and cache the value, instead of invoking this
    /// method multiple times on a message.
    int loadProperties(MessageProperties* buffer) const;

    /// Format this object to the specified output `stream` at the (absolute
    /// value of) the optionally specified indentation `level` and return a
    /// reference to `stream`.  If `level` is specified, optionally specify
    /// `spacesPerLevel`, the number of spaces per indentation level for
    /// this and all of its nested objects.  If `level` is negative,
    /// suppress indentation of the first line.  If `spacesPerLevel` is
    /// negative format the entire output on one line, suppressing all but
    /// the initial indentation (as governed by `level`).  If `stream` is
    /// not valid on entry, this operation has no effect.
    bsl::ostream&
    print(bsl::ostream& stream, int level = 0, int spacesPerLevel = 4) const;
};

// FREE OPERATORS

/// Format the specified `rhs` to the specified output `stream` and return a
/// reference to the modifiable `stream`.
bsl::ostream& operator<<(bsl::ostream& stream, const Message& rhs);

// ============================================================================
//                             INLINE DEFINITIONS
// ============================================================================

// -------------
// class Message
// -------------

inline Message& Message::setData(const bdlbb::Blob* data)
{
    return setDataRef(data);
}

inline Message& Message::setData(const char* data, size_t length)
{
    return setDataRef(data, length);
}

inline Message::operator bool() const
{
    return isInitialized();
}

// -------------------------------
// class MessageConfirmationCookie
// -------------------------------

inline MessageConfirmationCookie::MessageConfirmationCookie()
: d_queueId()
, d_guid()
{
    // NOTHING
}

inline MessageConfirmationCookie::MessageConfirmationCookie(
    const bmqa::QueueId&     queueId,
    const bmqt::MessageGUID& messageGUID)
: d_queueId(queueId)
, d_guid(messageGUID)
{
    // NOTHING
}

// ACCESSORS
inline const QueueId& MessageConfirmationCookie::queueId() const
{
    return d_queueId;
}

inline const bmqt::MessageGUID& MessageConfirmationCookie::messageGUID() const
{
    return d_guid;
}
}  // close package namespace

// -------------
// class Message
// -------------

// FREE OPERATORS
inline bsl::ostream& bmqa::operator<<(bsl::ostream&        stream,
                                      const bmqa::Message& rhs)
{
    return rhs.print(stream, 0, -1);
}

}  // close enterprise namespace

#endif
